package com.xuzm.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

public class HdfsDemo {
    // 可操作HDFS文件系统的对象
    FileSystem hdfs = null;

    // 测试方法执行前执行，用于初始化操作，避免频繁初始化
    @Before
    public void init() throws IOException {
        // 构造一个配置参数对象，设置一个参数：要访问的HDFS的URI
        Configuration conf = new Configuration();
        conf.set("dfs.replication", "3");
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        // 指定使用HDFS访问
        conf.set("fs.defaultFS","hdfs://hadoop-master:9000");
        // 进行客户端身份的设置(root为虚拟机的用户名，hadoop集群节点的其中一个都可以)
        System.setProperty("HADOOP_USER_NAME","xu");
        // 通过FileSystem的静态get()方法获取HDFS文件系统客户端对象
        hdfs = FileSystem.get(conf);
    }

    // 测试方法执行后执行，用于处理结尾的操作，关闭对象
    @After
    public void close() throws IOException {
        // 关闭文件操作对象
        hdfs.close();
    }

    @Test
    public void testUploadFileToHDFS() throws IOException {
        // 待上传的文件路径(windows)
        Path src = new Path("D:\\produce\\workspace\\scala-work\\first\\input\\1.txt");
        // 上传之后存放的路径(HDFS)
        Path dst = new Path("/input/");
        // 上传
        hdfs.copyFromLocalFile(src,dst);
        System.out.println("上传成功");
    }

    @Test
    public void testDownFileToLocal() throws IOException {
        // 待下载的路径(HDFS)
        Path src = new Path("/output/part-00000");
        // 下载成功之后存放的路径(windows)
        Path dst = new Path("C:\\Users\\86159\\Desktop");
        // 下载
        hdfs.copyToLocalFile(false,src,dst,true);
        System.out.println("下载成功");
    }

    @Test
    public void testMkdirFile() throws IOException {
        // 待创建目录路径
        Path src = new Path("/input");
        // 创建目录
        hdfs.mkdirs(src);
        System.out.println("创建成功");
    }

    @Test
    public void testRenameFile() throws IOException {
        // 重命名之前的名字
        Path src = new Path("/HDFS");
        // 重命名之后的名字
        Path dst = new Path("/HDFS1");
        // 重命名
        hdfs.rename(src,dst);
        System.out.println("重命名成功");
    }

    @Test
    public void testDeleteFile() throws IOException {
        // 待删除目录路径(HDFS)
        Path src = new Path("/HDFS1");
        // 删除
        hdfs.delete(src,true);
        System.out.println("删除成功");
    }

    @Test
    public void testCheckFile() throws IOException {
        // 获取迭代器对象("/"表示获取全部目录下的文件)
        RemoteIterator<LocatedFileStatus> listFiles = hdfs.listFiles(new Path("/"), true);
        while (listFiles.hasNext()) {
            LocatedFileStatus fileStatus = listFiles.next();
            // 打印当前文件名
            System.out.println("文件名：" + fileStatus.getPath().getName());
            // 打印当前文件块大小
            System.out.println("文件块大小：" + fileStatus.getBlockSize());
            // 打印当前文件权限
            System.out.println("文件权限：" + fileStatus.getPermission());
            // 打印当前文件内容的长度
            System.out.println("文件内容长度：" + fileStatus.getLen());
            // 获取该文件块的信息（包含长度，数据块，DataNodes的信息）
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            for (BlockLocation bl : blockLocations) {
                System.out.println("block-length:" + bl.getLength());
                System.out.println("block-offset:" + bl.getOffset());
                // 获取DataNodes的主机名
                String[] hosts = bl.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
            System.out.println("-----------------分割线-----------------");
        }
    }
}
